package com.atguigu.gmall.realtime.utils;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.producer.ProducerConfig;

import java.util.Properties;

public class MyKafkaPro {
    private static String bootstrapSever="hadoop104:9092,hadoop105:9092,hadoop106:9092";
    private static String  DEFAULT_TOPIC="default_topic";
    public static FlinkKafkaProducer<String> getFlinkKafkaProducer(String topic){

        return  new FlinkKafkaProducer<String>(
                bootstrapSever,
                topic,
                new SimpleStringSchema()
                );
    }


    //将ods_base_db_log的事实数据写入到不同的topic中,有传入的数据自定义序列化方式
    public static  <T>FlinkKafkaProducer<T> getFlinkKafkaSink(KafkaSerializationSchema<T> kafkaSerializationSchema){
        //kafka的配置文件
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop104:9092,hadoop105:9092,hadoop106:9092");
        //设置超时时间,15min
        properties.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,60*1000*15+"");
        return new FlinkKafkaProducer<T>(DEFAULT_TOPIC,kafkaSerializationSchema,properties, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);

    }

    //拼接kafka的相关属性到ddl
    public  static String getKafkaDDL(String topic,String group){
        String ddl="'connector' = 'kafka'," +
                "'topic' = '"+topic+"'," +
                "'properties.bootstrap.servers' = '"+bootstrapSever+"'," +
                "'properties.group.id' = '"+group+"'," +
                " 'format' = 'json'," +
                " 'scan.startup.mode' = 'latest-offset' ";
        return ddl;
    }
}
